Skip to content

Conversation

@Will-Lo
Copy link
Contributor

@Will-Lo Will-Lo commented Aug 16, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Gobblin YARN Application Launcher lacks some functionality used in MRJobLauncher. One of the biggest gaps in feature parity is the absence of jar caching, where MRJobLauncher creates a monthly cache that is automatically cleaned up by subsequent executions performed 2 months in advance.

YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 mins for a sizeable job to get all the jars), and given that many jobs do share the same jars, it makes sense to cache them together and only provide YARN the shared path.

We also want to ensure that SNAPSHOT jars are other files are not uploaded to a cache, since they are not immutable unlike jar versions on Artifactory.

This PR implements jar caching through 2 configurations:

gobblin.yarn.jar.cache.enabled
gobblin.yarn.jar.cache.dir

Where if gobblin.yarn.jar.cache.enabled=true, then it will look for the directory defined in gobblin.yarn.jar.cache.dir. It is expected that snapshot jars and other files are stored in some directory that is unique to the execution so that those jars will not be shared across other concurrent executions, only jars stored in the jar cache will be.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    It is tested that this saves approximately 10 minutes of bootstrap time per job.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@Will-Lo Will-Lo changed the title Cache temporal jars [GOBBLIN-2135] Cache Gobblin YARN application jars Aug 19, 2024
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
} else {
LOG.error("Failed to upload jar file: " + status.getPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find the prior code throwing an error...

nonetheless, should everything continue on w/ just some error logs?

shouldn't we instead fail the overall job because presumably necessary jars won't be there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways.

* @return
* @throws IOException
*/
public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jarFileMaximumRetry => simply maxAttempts?

}
if (this.jarCacheEnabled) {
addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this key. you're checking it here in two different conditionals, but in neither one do you actually use (or even check to see) what value it holds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops should have used gobblin.yarn.container.jars key

this.appLauncherMode = ConfigUtils.getString(config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);

this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
this.jarCacheEnabled = ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NBD, but you just updated the two above to be this.config, but only use config here :)

}
if (resourceMap.isPresent()) {
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get());
LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, should this be an actual failure, not merely logging?

...or do we believe there are times when it's actually OK to continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concern is valid but currently trying to get parity with MR job launcher, I think since there could always be concurrent executions adding the jars instead so it can be worthwhile to just attempt the job, it will fail loudly if the jars weren't uploaded properly anyways.

Copy link
Contributor

@phet phet Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's fine to start. how about a

// TODO: decide whether to fail-fast here, given the job may be unable to run w/o it

if (this.jarCacheEnabled) {
Path jarCachePath = YarnHelixUtils.calculateJarCachePath(this.config);
// Retain at least the current and last month's jars to handle executions running for ~30 days max
boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this run before or after caching jars? e.g. do we save only two prior AND THEN potentially add one more or we've added any new one already prior to retention paring it down to two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs after caching the jars. But it uses a consistent YARN_APPLICATION_LAUNCHER_START_TIME_KEY in the job so no matter how many times we look at the cache path it's only creating one path at most, and that path would be the ones where the jars are being uploaded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to add this info to a code comment here (in the caller, since it's the combination of the two being used, so less effective in either's javadoc). the crux is that retention of K=2 won't save us in cases where K=3 might exceed any FS quotas.

Comment on lines 232 to 233
if (jarDirs.size() > k) {
return fs.delete(jarDirs.get(0).getPath(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is not a loop, it seems it would delete at most one dir even if there are more than 1 more than k. is that ok? if so, document in javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to use a loop, for consistency with naming convention.

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 55.31%. Comparing base (e501b62) to head (72eb403).
Report is 23 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4030      +/-   ##
============================================
+ Coverage     45.86%   55.31%   +9.45%     
+ Complexity     3257     1582    -1675     
============================================
  Files           707      307     -400     
  Lines         27865    10580   -17285     
  Branches       2796     1069    -1727     
============================================
- Hits          12779     5852    -6927     
+ Misses        14008     4223    -9785     
+ Partials       1078      505     -573     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Will-Lo Will-Lo force-pushed the cache-temporal-jars branch from 23a9b21 to 88fdddc Compare September 16, 2024 17:47
Comment on lines 42 to 49
* @param localJarPath
* @param unsharedJarsDir
* @param jarCacheDir
* @return
* @throws IOException
*/
public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException {
Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir;
Path destJarFile = new Path(fs.makeQualified(uploadDir), localJar.getPath().getName());
public static Path calculateDestJarFilePath(FileSystem fs, String localJarPath, Path unsharedJarsDir, Path jarCacheDir) throws IOException {
Path uploadDir = localJarPath.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seeing this invoked, localJarPath looks more like localJarBasename, not the (full) path.

(or simply jarName)

return fs.delete(jarDirs.get(0).getPath(), true);
boolean deletesSuccessful = true;
for (int i = 0; i < jarDirs.size() - k; i++) {
deletesSuccessful = deletesSuccessful && fs.delete(jarDirs.get(i).getPath(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&=

if (this.jarCacheEnabled) {
Path jarCachePath = YarnHelixUtils.calculateJarCachePath(this.config);
// Retain at least the current and last month's jars to handle executions running for ~30 days max
boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to add this info to a code comment here (in the caller, since it's the combination of the two being used, so less effective in either's javadoc). the crux is that retention of K=2 won't save us in cases where K=3 might exceed any FS quotas.

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work william - I can't wait to get this in and streamline every startup!

@Will-Lo Will-Lo merged commit a07a4f1 into apache:master Sep 17, 2024
@Will-Lo Will-Lo deleted the cache-temporal-jars branch September 17, 2024 21:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants